JBoss Community Archive (Read Only)

RHQ 4.10

Aggregation Schema Changes

Overview

This document explains schema changes that aim to reduce I/O overhead during aggregation and to handle missed aggregations. The latest RHQ release as of this writing is 4.9.0. The latest JON release is 3.2.0.

Terminology

This section establishes some terminology to help make some of the concepts discussed more understandable and to avoid potential confusion.

Row

Unless otherwise stated a row will refer to a CQL row. Remember though that CQL transposes rows and columns, making them look more similar to relational rows and columns.

Partition

A partition refers to one physical row in a table or column family. A partition may be comprised of multiple CQL rows.

Primary Key

Uniquely identifies a row. It can consist of one or more columns.

Partition Key

The first column of the primary key. All rows having the same partition key are stored in the same partition and on the same node.

Clustering Columns

Any additional columns in the primary key after the partition key. Clustering columns define grouping and sorting within a partition.

Bucket

One of the metric data tables - raw_metrics, one_hour_metrics, six_hour_metrics, twenty_four_hour_metrics

I/O Overhead

We currently have the metrics_index table with the following schema,

CREATE TABLE metrics_index (
  bucket text,
  time timestamp,
  schedule_id int,
  PRIMARY KEY ((bucket, time), schedule_id)
)

We perform two writes when inserting raw data - one to the raw_metrics table and one to metrics_index. Suppose during the 10:00 hour we insert raw data for schedule id 100 having a timestamp of 10:02:00. The insert into metrics_index will look like,

INSERT INTO metrics_index (bucket, time, schedule_id)
VALUES (‘one_hour_metrics’, 10:00, 100)

The timestamp is rounded down to the start of the current time slice, 10:00. During aggregation, a single query is executed against metrics_index to obtain all of the schedule ids with raw data to be aggregated. Then for each schedule id, a query is executed against raw_metrics to fetch the data to be aggregated into 1 hour metrics.

If the query against metrics_index returns N measurement schedule ids, then N + 1 queries are executed to aggregate raw data. In the worst case scenario where we also have to aggregate 1 hour and 6 hour data, 3N + 3 queries are executed.

This does not scale well as the number of schedules increases as it leads to a lot of I/O overhead on the storage node(s).

Missed Aggregations

There are a few scenarios in which we could fail to aggregate metric data.

Server Outage

Suppose the server goes down at 08:46 and does not come back up until 09:45. We miss aggregating data for the 08:00 to 09:00 time slice. We already check at server startup if there has been a missed aggregation. If we detect that one has been missed, aggregation for the missed time slice will run during the next scheduled aggregation.

Failed Aggregation

Suppose that the Storage Cluster goes down during aggregation when only 10% of the schedules been processed. Aggregation should terminate immediately. The remaining 90% of the schedules will not have their data aggregated.

Late Measurement Reports

Suppose an agent loses it connection to the server at 09:30. The agent will spool measurement reports to disk. The agent reconnects at 10:15 after aggregation has finished. The agent sends a measurement report with data from the 09:00 hour. That data does not get aggregated.

Schema Changes

Two tables are being added - metrics_cache and metrics_cache_index. The metrics_index table will be dropped. Here is the schema.

CREATE TABLE metrics_cache (
  bucket text,
  time_slice timestamp,
  start_schedule_id int,
  schedule_id int,
  time timestamp,
  value map<int, double>,
  PRIMARY KEY ((bucket, time_slice, start_schedule_id), schedule_id, time)
);

CREATE TABLE metrics_cache_index (
  bucket text,
  time_slice timestamp,
  partition int,
  start_schedule_id int,
  time timestamp,
  set<int> schedule_ids,
  PRIMARY KEY ((bucket, time_slice, partition), start_schedule_id, time)
);

When we insert metric data, we write to raw_metrics, metrics_cache, and metrics_cache_index. metrics_cache and metrics_cache_index are only queried during aggregation and during server startup.

We store data for multiple schedules per partition in metrics_cache. We will refer to the number of schedules per partition as the partition block size (PBS). The block size is configured at server startup.

Partition block size currently cannot be modified but there are plans to support updating it dynamically at runtime.

We will look at an example of storing raw data to explain the new tables. Suppose we have the following raw data and PBS = 10.

{scheduleId: 100, timestamp: 10:18:00, value: 3.14},
{scheduleId: 221, timestamp: 10:16:00, value: 84},
{scheduleId: 366, timestamp: 10:09:00, value: 2.17}

Let’s first look at the INSERT statements for metrics_cache.

INSERT INTO metrics_cache
  (bucket, time_slice, start_schedule_id, schedule_id, time, value)
VALUES (‘raw_metrics’, 10:00, 100, 100, 10:18, {3: 3.14});

INSERT INTO metrics_cache
  (bucket, time_slice, start_schedule_id, schedule_id, time, value)
VALUES (‘raw_metrics’, 10:00, 220, 221, 10:16, {3: 84});

INSERT INTO metrics_cache
  (bucket, time_slice, start_schedule_id, schedule_id, time, value)
VALUES (‘raw_metrics’, 10:00, 360, 366, 10:09, {3: 2.17});

bucket identifies the historical table in which the data is stored. It will have as its value one of raw_metrics, one_hour_metrics, or six_hour_metrics depending on the type of metric data.

time_slice is derived by rounding down the timestamp to the start of the current time slice.

start_schedule_id will be determined by the values of scheduleId and PBS. It identifies a block of schedule IDs, e.g., 100 to 110, 220 to 230, 360 to 370.

time is the actual timestamp for the data being inserted.

value is stored as a map so that we can accommodate both raw and aggregate metrics.

Here are the INSERT statements for metrics_cache_index.

INSERT INTO metrics_cache_index
  (bucket, time_slice, partition, start_schedule_id, time)
VALUES (‘raw_metrics’, 10:00, 0, 100, 10:00);

INSERT INTO metrics_cache_index
  (bucket, time_slice, partition, start_schedule_id, time)
VALUES (‘raw_metrics’, 10:00, 0, 220, 10:00);

INSERT INTO metrics_cache_index
  (bucket, time_slice, partition, start_schedule_id, time)
VALUES (‘raw_metrics’, 10:00, 0, 360, 10:00);

bucket is one of the historical tables and has the same value as metrics_cache.bucket.

time_slice is the time slice in which the data is inserted. This will typically be the same as metrics_cache.time_slice but may differ in the event of late measurement reports.

partition is a number between 0 and n - 1 where n is the number of partitions that we want to split the bucket, time slice across. Initially and probably in many cases n will always be zero. If and when a single partition gets very big, it can lead to a hot spot on a node. If we find that that we have hit that point, we can increase the value of n. Clients will alternate the value of n on writes in a round robin fashion so that partitions grow at a similar rate. Partitions will be merged client side during aggregation.

start_schedule_id is the same as metrics_cache.start_schedule_id.

time is the time slice in which the data was collected. Under normal circumstances time and time_slice will have the same value. They will differ though in the cases of late measurement reports and failed aggregations.

Reducing I/O Overhead

During aggregation we only fetch data from metrics_cache and not from the historical tables (under normal circumstances). For a given bucket and time slice, we execute
(N / PBS) + 1 queries. If N = 1,000,000, and PBS = 10, then we execute 100,000 queries. This yields a 90% reduction in the number of queries and is made possible by storing data for multiple schedules within the same partition. The extra query is against metrics_cache_index which tells us all of the partitions in metrics_cache that have data.

TODO

Discuss trade offs of different PBS sizes

When we finish aggregating data for a partition block, we delete it from metrics_cache as well as the corresponding block from metrics_cache_index. Failure situations are discussed in the next section.

Handling Missed Aggregations

Suppose aggregation for partition block 360 finishes with errors. The partition block is not be deleted from metrics_cache. We still delete the block from metrics_cache_index and also perform the following insert,

INSERT INTO metrics_cache_index
  (bucket, time_slice, partition, start_schedule_id, time)
VALUES (‘raw_metrics’, 11:00, 0, 360, 10:00);

Note that time_slice is now 11:00. This effectively reschedules partition block 360 to be aggregated during the 11:00 hour. Deleting the row in metrics_cache_index and then reinserting into the next time slice prevents having to query across multiple partitions. Partition block 360 will be deleted during aggregation in the 11:00 hour, assuming the second attempt completes successfully.

Now let's consider a total failure in which case aggregation during the 10:00 hour is aborted prior to completion. This could happen for example if the storage cluster goes down in the middle of aggregation. When aggregation is aborted, any remaining partition blocks in metrics_cache_index are left intact. Nothing is rescheduled. If the entire aggregation run is aborted, there could be thousands of remaining partition blocks. Rather than performing a delete and insert for each one, we can instead set a flag to let the aggregation engine know that it needs to re-query the 10:00 partition during the next aggregation run.

Next we discuss late measurement reports. Suppose an agent disconnects from the server at 09:30. The agent spools measurement reports to disk. The server starts aggregation at 10:01 and finishes at 10:03. The agent reconnects at 10:04 and sends a report with data for schedule IDs 221 and 366 having timestamps of 09:45 and 09:50 respectively. The following rows are inserted into metrics_cache_index,

INSERT INTO metrics_cache_index
  (bucket, time_slice, partition, start_schedule_id, time, schedule_ids)
VALUES (‘raw_metrics’, 11:00, 0, 360, 09:00, {221});

INSERT INTO metrics_cache_index
  (bucket, time_slice, partition, start_schedule_id, time, schedule_ids)
VALUES (‘raw_metrics’, 11:00, 0, 360, 09:00, {366});

A non-empty value for schedule_ids indicates that the corresponding partition blocks in metrics_cache may have incomplete data for the time slice; consequently, these INSERTs include specific schedule IDs so that we can pull data from raw_metrics instead of metrics_cache during aggregation for these schedules. When schedule_ids is empty we pull data from metrics_cache. Note that we still need to delete the corresponding partition block from metrics_cache even when schedule_ids is non-empty.

We want to cap the age of data that we ingest. Let’s make it 24 hours. If the server receives measurement reports with data more than a day old, it simply drops the data. The cap can be made configurable. Without a cap, we would need to rely on using TTLs or some background job that ensure old data gets purged. Putting a cap in place effectively and safely allows us to avoid the overhead incurred from using TTLs.

With a query against a single partition in metrics_cache_index we can efficiently find out all of the cache blocks with data to be aggregate for the most recently completed time slice as well as time slices for the past 24 hours.

Upgrades

Schema changes are applied at installation (or upgrade) time. There could potentially be a large amount of data to migrate over to metrics_cache. This could make for a long-running, complicated upgrade process. A better solution is to only populate metrics_cache_index which should be very fast by comparison. We specify schedule_ids as we do for late measurement reports so that data is pulled from raw_metrics during the initial aggregation run. Subsequent aggregation runs will then use metrics_cache as previously described.

JBoss.org Content Archive (Read Only), exported from JBoss Community Documentation Editor at 2020-03-11 14:00:46 UTC, last content change 2014-03-03 17:20:14 UTC.